package me.info.sboot.app.holder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.GeoDistanceQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;
import me.info.common.Jsons;

/**
 * some from https://blog.csdn.net/zzqaaasss/article/details/104241053
 *
 * @author Amber
 * @version v0.0.1
 * @desc
 * @date 2020/12/21 11:03
 */
@Slf4j
@Configuration
public class EsTemplate {

  final RestHighLevelClient rhlClient;

  public EsTemplate(@Qualifier("rhlClient") RestHighLevelClient rhlClient) {
    this.rhlClient = rhlClient;
  }

  /**
   * 判断索引是否存在
   *
   * @param index
   * @return
   * @throws IOException
   */
  public boolean existsIndex(String index) throws IOException {
    GetIndexRequest request = new GetIndexRequest(index);
    boolean exists = rhlClient.indices().exists(request, RequestOptions.DEFAULT);
    return exists;
  }
  /**
   * 创建索引
   *
   * @param index
   * @throws IOException
   */
  public boolean createIndex(String index) throws IOException {
    if (existsIndex(index)) {
      log.error("index is exist!");
      return false;
    }
    CreateIndexRequest request = new CreateIndexRequest(index);
    request.settings(
        Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));
    // 设置查询结果最大返回条数
    // request.settings(Settings.builder().put("max_result_window",2147483647));
    request.mapping(index, XContentType.JSON);

    CreateIndexResponse createIndexResponse =
        rhlClient.indices().create(request, RequestOptions.DEFAULT);
    boolean isAcknowledged = createIndexResponse.isAcknowledged();
    // 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
    boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
    if (isAcknowledged || shardsAcknowledged) {
      // throw new RuntimeException("init es index failed");
      log.info("crate index[{}] success.", index);
      return true;
    }
    return false;
  }


  /**
   * 创建索引
   *
   * @param index
   * @param mappingBuilder
   * @throws IOException
   */
  public boolean createIndex(String index, XContentBuilder mappingBuilder)
      throws IOException {
    if (existsIndex(index)) {
      log.error("index is exist!");
      return false;
    }
    CreateIndexRequest request = new CreateIndexRequest(index);
    request.settings(
        Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));
    // 设置查询结果最大返回条数
    // request.settings(Settings.builder().put("max_result_window",2147483647));
    // request.mapping(index, XContentType.JSON);
    request.mapping(mappingBuilder );

    CreateIndexResponse createIndexResponse =
        rhlClient.indices().create(request, RequestOptions.DEFAULT);
    boolean isAcknowledged = createIndexResponse.isAcknowledged();
    // 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
    boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
    if (isAcknowledged || shardsAcknowledged) {
      // throw new RuntimeException("init es index failed");
      log.info("create index[{}] success.", index);
      return true;
    }
    return false;
  }
  /**
   * 判断某索引下文档id是否存在
   *
   * @param index
   * @param id
   * @return
   * @throws IOException
   */
  public boolean docExists(String index, String id) throws IOException {
    GetRequest getRequest = new GetRequest(index, id);
    // 只判断索引是否存在不需要获取_source
    getRequest.fetchSourceContext(new FetchSourceContext(false));
    getRequest.storedFields("_none_");
    boolean exists = rhlClient.exists(getRequest, RequestOptions.DEFAULT);
    return exists;
  }

  /**
   * 通过ID 更新数据
   *
   * @param object 要更新数据
   * @param index 索引，类似数据库
   * @param id 数据ID
   * @return
   */
  public void updateDataById(Object object, String index, String id) throws IOException {
    UpdateRequest update = new UpdateRequest(index, id);
    update.timeout("1s");
    update.doc(JSON.toJSONString(object), XContentType.JSON);
    rhlClient.update(update, RequestOptions.DEFAULT);
  }

  /**
   * 通过ID判断文档是否存在
   *
   * @param index 索引，类似数据库
   * @param id 数据ID
   * @return
   */
  public boolean existsById(String index, String id) throws IOException {
    GetRequest request = new GetRequest(index, id);
    // 不获取返回的_source的上下文
    request.fetchSourceContext(new FetchSourceContext(false));
    request.storedFields("_none_");
    return rhlClient.exists(request, RequestOptions.DEFAULT);
  }

  /**
   * 添加文档记录
   *
   * @param index
   * @param id
   * @param t 要添加的数据实体类
   * @return
   * @throws IOException
   */
  public <T> boolean addDoc(String index, String id, T t) throws IOException {

    IndexRequest request = new IndexRequest(index);
    request.id(id);
    // timeout
    request.timeout(TimeValue.timeValueSeconds(1));
    request.timeout("1s");
    request.source(JSON.toJSONString(t), XContentType.JSON);

    IndexResponse indexResponse = rhlClient.index(request, RequestOptions.DEFAULT);
    // indexResponse.getId();
    RestStatus Status = indexResponse.status();
    return Status == RestStatus.OK || Status == RestStatus.CREATED;
  }

  /**
   * 数据添加，自定义id
   *
   * @param object 要增加的数据
   * @param index 索引，类似数据库
   * @param id 数据ID,为null时es随机生成
   * @return
   */
  public String addData(Object object, String index, String id) throws IOException {
    // 创建请求
    IndexRequest request = new IndexRequest(index);
    // 规则 put /test_index/_doc/1
    request.id(id);
    request.timeout(TimeValue.timeValueSeconds(1));
    // 将数据放入请求 json
    IndexRequest source = request.source(JSON.toJSONString(object), XContentType.JSON);
    // 客户端发送请求
    IndexResponse response = rhlClient.index(request, RequestOptions.DEFAULT);
    return response.getId();
  }

  /**
   * 数据添加 随机id
   *
   * @param object 要增加的数据
   * @param index 索引，类似数据库
   * @return
   */
  public String addData(Object object, String index) throws IOException {
    return addData(object, index, UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
  }

  /**
   * 根据id来获取记录
   *
   * @param index
   * @param id
   * @return
   * @throws IOException
   */
  public GetResponse getDoc(String index, String id) throws IOException {
    GetRequest request = new GetRequest(index, id);
    GetResponse getResponse = rhlClient.get(request, RequestOptions.DEFAULT);
    return getResponse;
  }

  /**
   * 批量添加文档记录 没有设置id ES会自动生成一个，如果要设置 IndexRequest的对象.id()即可
   *
   * @param index
   * @param list
   * @return
   * @throws IOException
   */
  public <T> boolean addBulk(String index, List<T> list) throws IOException {
    BulkRequest bulkRequest = new BulkRequest();
    // timeout
    bulkRequest.timeout(TimeValue.timeValueMinutes(2));
    bulkRequest.timeout("2m");
    list.stream()
        .forEach(
            el -> {
              String json = Jsons.json(el);
              bulkRequest.add(new IndexRequest(index).source(json, XContentType.JSON));
            });
    /*
    for (int i = 0; i < list.size(); i++) {
      bulkRequest.add(
          new IndexRequest(index).source(JSON.toJSONString(list.get(i)), XContentType.JSON));
    }*/
    BulkResponse bulkResponse = rhlClient.bulk(bulkRequest, RequestOptions.DEFAULT);

    return !bulkResponse.hasFailures();
  }

  /**
   * @param index 索引，类似数据库
   * @param objects 数据
   * @return
   */
  public boolean bulkPost(String index, List<?> objects) {
    BulkRequest bulkRequest = new BulkRequest();
    // 最大数量不得超过20万
    for (Object object : objects) {
      IndexRequest request = new IndexRequest(index);
      request.source(JSON.toJSONString(object), XContentType.JSON);
      bulkRequest.add(request);
    }
    try {
      BulkResponse response = rhlClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      return !response.hasFailures();
    } catch (IOException e) {
      e.printStackTrace();
      return false;
    }
  }

  /**
   * 根据经纬度查询范围查找location 经纬度字段，distance 距离中心范围KM，lat lon 圆心经纬度
   *
   * @param index
   * @param longitude
   * @param latitude
   * @param distance
   * @return
   */
  public SearchResponse geoDistanceQuery(
      String index, Float longitude, Float latitude, String distance) throws IOException {

    if (longitude == null || latitude == null) {
      return null;
    }
    // 拼接条件
    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
    //        QueryBuilder isdeleteBuilder = QueryBuilders.termQuery("isdelete", false);
    // 以某点为中心，搜索指定范围
    GeoDistanceQueryBuilder distanceQueryBuilder = new GeoDistanceQueryBuilder("location");
    distanceQueryBuilder.point(latitude, longitude);
    // 查询单位：km
    distanceQueryBuilder.distance(distance, DistanceUnit.KILOMETERS);
    boolQueryBuilder.filter(distanceQueryBuilder);
    //        boolQueryBuilder.must(isdeleteBuilder);

    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.query(boolQueryBuilder);

    SearchRequest searchRequest = new SearchRequest(index);
    searchRequest.source(searchSourceBuilder);

    SearchResponse searchResponse = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
    return searchResponse;
  }
  /** 批量删除和更新就不写了可根据上面几个方法来写 */

  /**
   * 更新文档记录
   *
   * @param index
   * @param id
   * @param t
   * @return
   * @throws IOException
   */
  public <T> boolean updateDoc(String index, String id, T t) throws IOException {
    UpdateRequest request = new UpdateRequest(index, id);
    request.doc(JSON.toJSONString(t));
    request.timeout(TimeValue.timeValueSeconds(1));
    request.timeout("1s");
    UpdateResponse updateResponse = rhlClient.update(request, RequestOptions.DEFAULT);
    return updateResponse.status() == RestStatus.OK;
  }

  /**
   * 删除文档记录
   *
   * @param index
   * @param ids
   * @return
   * @throws IOException
   */
  public boolean deleteBatch(String index, String... ids) throws IOException {
    BulkRequest request = new BulkRequest();
    Arrays.stream(ids).forEach(id -> request.add(new DeleteRequest(index, id.toString())));
    BulkResponse response = rhlClient.bulk(request, RequestOptions.DEFAULT);
    return response.status() == RestStatus.OK;
  }

  public void deleteByQuery(String index, QueryBuilder builder) {
    DeleteByQueryRequest request = new DeleteByQueryRequest(index);
    request.setQuery(builder);
    // 设置批量操作数量,最大为10000
    request.setBatchSize(10000);
    request.setConflicts("proceed");
    try {
      rhlClient.deleteByQuery(request, RequestOptions.DEFAULT);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * 删除文档记录
   *
   * @param index
   * @param id
   * @return
   * @throws IOException
   */
  public boolean deleteDocById(String index, String id) throws IOException {
    DeleteRequest request = new DeleteRequest(index, id);
    // timeout
    request.timeout(TimeValue.timeValueSeconds(1));
    request.timeout("1s");
    DeleteResponse deleteResponse = rhlClient.delete(request, RequestOptions.DEFAULT);

    return deleteResponse.status() == RestStatus.OK;
  }

  /**
   * 删除索引
   *
   * @param index
   * @return
   * @throws IOException
   */
  public boolean deleteIndex(String index) throws IOException {
    if (!existsIndex(index)) {
      log.error("index is not exist!");
      return false;
    }
    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
    deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);

    AcknowledgedResponse response =
        rhlClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
    return response.isAcknowledged();
  }

  /**
   * 根据某字段来搜索
   *
   * @param index
   * @param field
   * @param key 要收搜的关键字
   * @throws IOException
   */
  public void search(String index, String field, String key, Integer from, Integer size)
      throws IOException {
    SearchRequest searchRequest = new SearchRequest(index);
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(QueryBuilders.termQuery(field, key));
    // 控制搜素
    sourceBuilder.from(from);
    sourceBuilder.size(size);
    // 最大搜索时间。
    sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
    searchRequest.source(sourceBuilder);
    SearchResponse searchResponse = rhlClient.search(searchRequest, RequestOptions.DEFAULT);
    System.out.println(JSON.toJSONString(searchResponse.getHits()));
  }

  public SearchResponse search(SearchRequest req) throws IOException {
    return rhlClient.search(req, RequestOptions.DEFAULT);
  }

  /**
   * 高亮结果集 特殊处理 map转对象 JSONObject.parseObject(JSONObject.toJSONString(map), Content.class)
   *
   * @param searchResponse
   * @param highlightField
   */
  private List<Map<String, Object>> setSearchResponse(
      SearchResponse searchResponse, String highlightField) {
    // 解析结果
    ArrayList<Map<String, Object>> list = new ArrayList<>();
    for (SearchHit hit : searchResponse.getHits().getHits()) {
      Map<String, HighlightField> high = hit.getHighlightFields();
      HighlightField title = high.get(highlightField);
      Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 原来的结果
      // 解析高亮字段,将原来的字段换为高亮字段
      if (title != null) {
        Text[] texts = title.fragments();
        String nTitle = "";
        for (Text text : texts) {
          nTitle += text;
        }
        // 替换
        sourceAsMap.put(highlightField, nTitle);
      }
      list.add(sourceAsMap);
    }
    return list;
  }

  /**
   * 通过ID获取数据
   *
   * @param index 索引，类似数据库
   * @param id 数据ID
   * @param fields 需要显示的字段，逗号分隔（缺省为全部字段）
   * @return
   */
  public Map<String, Object> searchDataById(String index, String id, String fields)
      throws IOException {
    GetRequest request = new GetRequest(index, id);
    if (StringUtils.isNotEmpty(fields)) {
      // 只查询特定字段。如果需要查询所有字段则不设置该项。
      request.fetchSourceContext(
          new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
    }
    GetResponse response = rhlClient.get(request, RequestOptions.DEFAULT);
    return response.getSource();
  }

  /**
   * 查询并分页
   *
   * @param index 索引名称
   * @param query 查询条件
   * @param size 文档大小限制
   * @param fields 需要显示的字段，逗号分隔（缺省为全部字段）
   * @param sortField 排序字段
   * @param highlightField 高亮字段
   * @return
   */
  public List<Map<String, Object>> searchListData(
      String index,
      SearchSourceBuilder query,
      Integer size,
      Integer from,
      String fields,
      String sortField,
      String highlightField)
      throws IOException {
    SearchRequest request = new SearchRequest(index);
    SearchSourceBuilder builder = query;
    if (StringUtils.isNotEmpty(fields)) {
      // 只查询特定字段。如果需要查询所有字段则不设置该项。
      builder.fetchSource(new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
    }
    from = from <= 0 ? 0 : from * size;
    // 设置确定结果要从哪个索引开始搜索的from选项，默认为0
    builder.from(from);
    builder.size(size);
    if (StringUtils.isNotEmpty(sortField)) {
      // 排序字段，注意如果proposal_no是text类型会默认带有keyword性质，需要拼接.keyword
      builder.sort(sortField + ".keyword", SortOrder.ASC);
    }
    // 高亮
    HighlightBuilder highlight = new HighlightBuilder();
    highlight.field(highlightField);
    // 关闭多个高亮
    highlight.requireFieldMatch(false);
    highlight.preTags("<span style='color:red'>");
    highlight.postTags("</span>");
    builder.highlighter(highlight);
    // 不返回源数据。只有条数之类的数据。
    //        builder.fetchSource(false);
    request.source(builder);
    SearchResponse response = rhlClient.search(request, RequestOptions.DEFAULT);
    log.error("==" + response.getHits().getTotalHits());
    if (response.status().getStatus() == 200) {
      // 解析对象
      return setSearchResponse(response, highlightField);
    }
    return null;
  }

  public <T> List<T> search(String index, SearchSourceBuilder builder, Class<T> c) {
    SearchRequest request = new SearchRequest(index);
    request.source(builder);
    try {
      SearchResponse response = rhlClient.search(request, RequestOptions.DEFAULT);
      SearchHit[] hits = response.getHits().getHits();
      List<T> res = new ArrayList<>(hits.length);
      for (SearchHit hit : hits) {
        res.add(JSON.parseObject(hit.getSourceAsString(), c));
      }
      return res;
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  public SearchResponse search(QueryBuilder queryBuilder, long timeout, String... indecis)
      throws IOException {
    // 基础设置   new SearchRequest("patent"); "patent" 为索引名称
    SearchRequest req = new SearchRequest(indecis);
    // 搜索源构建对象
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    // 匹配关键字
    sourceBuilder.query(queryBuilder).timeout(TimeValue.timeValueSeconds(timeout));
    req.source(sourceBuilder);
    // 发起请求，获取结果
    SearchResponse resp = rhlClient.search(req, RequestOptions.DEFAULT);
    return resp;
  }

  public SearchResponse searchPage(
      BoolQueryBuilder queryBuilder, Long timeout, int page, int pageSize, String... indecis)
      throws IOException {
    SearchRequest req = new SearchRequest(indecis);
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(queryBuilder).timeout(TimeValue.timeValueSeconds(timeout));

    // 可以使用分页查找 比如设置 searchSourceBuilder.from(1);就是从第二页进行查找，类似于MySQL中的limit
    sourceBuilder.from(page).size(pageSize); // 意思是从第一页开始查找每页大小

    req.source(sourceBuilder);
    SearchResponse resp = rhlClient.search(req, RequestOptions.DEFAULT);
    return resp;
  }
}
